#!/bin/bash

set -o errexit

test_dir=$(realpath $(dirname $0))
. "${test_dir}/../functions"

set_debug

GTID_PATTERN='[A-F0-9a-f]{8}-[A-F0-9a-f]{4}-[A-F0-9a-f]{4}-[A-F0-9a-f]{4}-[A-F0-9a-f]{12}:[0-9]+'

if [[ $IMAGE_PXC =~ 5\.7 ]]; then
	echo "Skipping PITR test because 5.7 doesn't support it!"
	exit 0
fi

write_test_data() {
	local cluster=$1
	local config=$2
	local size="${3:-3}"
	local sleep="${4:-10}"
	local secretsFile="${5:-$conf_dir/secrets.yml}"
	local pxcClientFile="${6:-$conf_dir/client.yml}"

	local proxy=$(get_proxy "$cluster")

	desc 'write test data'
	if [[ $IMAGE_PXC =~ 5\.7 ]] && [[ "$(is_keyring_plugin_in_use "$cluster")" ]]; then
		encrypt='ENCRYPTION=\"Y\"'
	fi
	run_mysql \
		"CREATE DATABASE IF NOT EXISTS test; use test; CREATE TABLE IF NOT EXISTS test (id int PRIMARY KEY) $encrypt;" \
		"-h $proxy -uroot -proot_password"
	run_mysql \
		'INSERT test.test (id) VALUES (100500); INSERT test.test (id) VALUES (100501); INSERT test.test (id) VALUES (100502);' \
		"-h $proxy -uroot -proot_password"
	sleep 30
	for i in $(seq 0 $((size - 1))); do
		compare_mysql_cmd "select-3" "SELECT * from test.test;" "-h $cluster-pxc-$i.$cluster-pxc -uroot -proot_password"
	done

	if [ "$(is_keyring_plugin_in_use "$cluster")" ]; then
		table_must_be_encrypted "$cluster" "test"
	fi
}

write_data_for_pitr() {
	local cluster=$1
	local proxy=$(get_proxy "$cluster")

	desc "write data for pitr"
	run_mysql \
		'INSERT test.test (id) VALUES (100503); INSERT test.test (id) VALUES (100504); INSERT test.test (id) VALUES (100505);' \
		"-h $proxy -uroot -proot_password"
}

write_more_data() {
	local cluster=$1
	local proxy=$(get_proxy "$cluster")
	desc "write extra data"
	run_mysql \
		'INSERT test.test (id) VALUES (100506); INSERT test.test (id) VALUES (100507); INSERT test.test (id) VALUES (100508); INSERT test.test (id) VALUES (100509); INSERT test.test (id) VALUES (100510);' \
		"-h $proxy -uroot -proot_password"
}

create_binlog_gap() {
	desc 'create binlog gap'

	kubectl patch pxc $cluster --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":false}}}}'
	sleep 5 # wait for pitr pod to shutdown
	# write data which will be lost
	run_mysql \
		'INSERT test.gap (id) VALUES (100800); INSERT test.gap (id) VALUES (100801); INSERT test.gap (id) VALUES (100802);' \
		"-h $proxy -uroot -proot_password"
	# flush binlogs
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"
	# purge binlogs
	run_mysql_local 'PURGE BINARY LOGS BEFORE now();' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local 'PURGE BINARY LOGS BEFORE now();' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local 'PURGE BINARY LOGS BEFORE now();' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"
	# re-enable pitr
	kubectl patch pxc $cluster --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":true}}}}'
	# flush binlogs to trigger binlog collector
	# data below will be force recovered
	run_mysql \
		'INSERT test.gap (id) VALUES (100803); INSERT test.gap (id) VALUES (100804); INSERT test.gap (id) VALUES (100805);' \
		"-h $cluster-proxysql -uroot -proot_password"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"
	sleep 65 # wait for next PITR collect cycle and error to appear
}

check_binlog_gap_error() {
	desc 'check binlog gap error'

	# check error in pitr log
	local err_text1=$(kubectl_bin logs $(get_pitr_pod) | grep -c "ERROR: Couldn't find the binlog that contains GTID set")
	local err_text2=$(kubectl_bin logs $(get_pitr_pod) | grep -c "ERROR: Gap detected in the binary logs. Binary logs will be uploaded anyway, but full backup needed for consistent recovery.")
	if [[ $err_text1 -eq 0 || $err_text2 -eq 0 ]]; then
		echo "ERROR: Gap error text is not found in PITR pod logs."
		exit 1
	fi
	# check error in operator log
	local err_text3=$(kubectl_bin logs ${OPERATOR_NS:+-n$OPERATOR_NS} $(get_operator_pod) | grep -c "Gap detected in binary logs")
	if [[ $err_text3 -eq 0 ]]; then
		echo "ERROR: Gap error text is not found in operator pod logs."
		exit 1
	fi
	# check backup on-pitr-minio-gap marked as unready for PITR restore
	local backup_cond=$(kubectl_bin get pxc-backup on-pitr-minio-gap -ojsonpath='{.status.conditions[]}' | grep -c '"reason":"BinlogGapDetected","status":"False","type":"PITRReady"')
	if [[ $backup_cond -eq 0 ]]; then
		echo "ERROR: Backup is not tagged as PITR unready in the backup condition."
		kubectl_bin get pxc-backup on-pitr-minio-gap -oyaml
		exit 1
	fi
}

check_binlog_gap_restore() {
	local type=$1

	desc 'check binlog gap restore: ' $type
	# disable pitr
	kubectl patch pxc $cluster --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":false}}}}'
	# try restore, check error
	if [ "$type" == "error" ]; then
		local restore=restore-on-pitr-minio-gap-error
		kubectl_bin apply -f $test_dir/conf/${restore}.yaml
		wait_backup_restore "${restore}" "Failed"
		local backup_error=$(kubectl_bin get pxc-restore ${restore} -ojsonpath='{.status.comments}' | grep -c "Backup doesn't guarantee consistent recovery with PITR. Annotate PerconaXtraDBClusterRestore with percona.com/unsafe-pitr to force it.")
		if [[ $backup_error -eq 0 ]]; then
			echo "ERROR: Backup is not tagged as PITR unready in the backup condition."
			kubectl_bin get pxc-backup on-pitr-minio-gap -oyaml
			exit 1
		fi
		kubectl_bin delete -f "$test_dir/conf/${restore}.yaml"
	elif [ "$type" == "force" ]; then
		local restore=restore-on-pitr-minio-gap-force
		kubectl_bin apply -f "$test_dir/conf/${restore}.yaml"
		wait_backup_restore "${restore}" "Succeeded"
		wait_for_running "$cluster-proxysql" 2
		wait_for_running "$cluster-pxc" 3
		wait_cluster_consistency "$cluster" 3 2
		kubectl_bin logs job/restore-job-${restore}-${cluster}
		compare_mysql_cmd "select-gap" "SELECT * from test.gap;" "-h $cluster-pxc-0.$cluster-pxc -uroot -proot_password"
		compare_mysql_cmd "select-gap" "SELECT * from test.gap;" "-h $cluster-pxc-1.$cluster-pxc -uroot -proot_password"
		compare_mysql_cmd "select-gap" "SELECT * from test.gap;" "-h $cluster-pxc-2.$cluster-pxc -uroot -proot_password"
		kubectl_bin delete -f "$test_dir/conf/${restore}.yaml"
	elif [ "$type" == "no-pitr" ]; then
		local restore=restore-on-pitr-minio-gap-no-pitr
		kubectl_bin apply -f $test_dir/conf/${restore}.yaml
		wait_backup_restore "${restore}" "Succeeded"
		local backup_error=$(kubectl_bin get pxc-restore ${restore} -ojsonpath='{.status.comments}' | grep -c "Backup doesn't guarantee consistent recovery with PITR. Annotate PerconaXtraDBClusterRestore with percona.com/unsafe-pitr to force it.")
		if [[ $backup_error -ne 0 ]]; then
			echo "ERROR: Restore without PiTR is failed because backups is tagged PiTR unready in the backup condition."
			kubectl_bin get pxc-backup on-pitr-minio-gap -oyaml
			exit 1
		fi
	else
		echo "Wrong restore type!"
		exit 1
	fi
}

check_invalid_binlogs_error() {
	local binlog=$1
	desc 'check invalid binlogs'

	# check error in pitr log
	local err_text
	err_text=$(kubectl_bin logs "$(get_pitr_pod)" | grep -c "ERROR: Binlog file $binlog is invalid")
	if [[ $err_text -eq 0 ]]; then
		echo "ERROR: Invalid binlog error text is not found in PITR pod logs."
		exit 1
	fi

	sleep 65 # wait for next PITR collect cycle and error to appear
}

find_invalid_binlog_name() {
	local pod=$1
	local offset=$2

	binlogs=()
	while IFS= read -r line; do
		binlogs+=("$line")
	done < <(run_mysql_local "SHOW BINARY LOGS" "-h127.0.0.1 -P3306 -uroot -proot_password" "$pod" | awk '{print $1}')

	for ((i = 0; i < ${#binlogs[@]}; i++)); do
		local events
		events=$(run_mysql_local "SHOW BINLOG EVENTS IN '${binlogs[$i]}';" "-h127.0.0.1 -P3306 -uroot -proot_password" "$pod")
		if echo "$events" | grep -q "CREATE TABLE IF NOT EXISTS invalid"; then
			echo "${binlogs[$((i + offset))]}"
			return
		fi
	done

	echo "No invalid binlog found"
	exit 1
}

gtidset_to_gtid() {
	local gtidset=$1
	local gtid
	# Split the input string into two parts using ":" as the delimiter
	IFS=':' read -ra parts <<<"$gtidset"

	# Get the number after the "-" symbol
	number_part="${parts[1]#*-}"

	# Create the final transformed string
	transformed_string="${parts[0]}:$number_part"

	echo "$transformed_string"
}

invalid_binlog_test() {
	desc 'start invalid binlog test'

	local proxy
	proxy="$(get_proxy "$cluster")"

	kubectl patch pxc $cluster --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":false}}}}'
	sleep 5 # wait for pitr pod to shutdown

	# restore to initial state
	local backup
	backup="on-pitr-minio"
	yq "$test_dir/conf/restore-on-pitr-minio.yaml" \
		| yq eval 'del(.spec.pitr)' \
		| yq eval 'del(.spec.backupSource)' \
		| yq eval ".spec.backupName=\"$backup\"" \
		| kubectl_bin apply -f -
	wait_backup_restore restore-${backup}
	wait_for_running "$cluster-proxysql" 2
	wait_for_running "$cluster-pxc" 3
	wait_cluster_consistency "$cluster" 3 2

	run_mysql \
		"CREATE DATABASE IF NOT EXISTS test; use test; " \
		"-h $proxy -uroot -proot_password"
	write_test_data "$cluster"
	write_data_for_pitr "$cluster"

	# we need to use this function in test
	run_mysql_local "CREATE FUNCTION get_binlog_by_gtid_set RETURNS STRING SONAME 'binlog_utils_udf.so';" \
		"-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local "CREATE FUNCTION get_binlog_by_gtid_set RETURNS STRING SONAME 'binlog_utils_udf.so';" \
		"-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local "CREATE FUNCTION get_binlog_by_gtid_set RETURNS STRING SONAME 'binlog_utils_udf.so';" \
		"-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"
	# flush binlogs
	run_mysql_local 'FLUSH BINARY LOGS;FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local 'FLUSH BINARY LOGS;FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local 'FLUSH BINARY LOGS;FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"

	# this data should be in new binlog
	run_mysql \
		"USE test; CREATE TABLE IF NOT EXISTS invalid (id int PRIMARY KEY);" \
		"-h $proxy -uroot -proot_password"
	run_mysql \
		'INSERT test.invalid (id) VALUES (100900);
		 INSERT test.invalid (id) VALUES (100901);
		 INSERT test.invalid (id) VALUES (100902);' \
		"-h $proxy -uroot -proot_password"

	# flush binlogs
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"

	write_more_data "$cluster" # this data should be in new binlog

	invalid_binlog=$(find_invalid_binlog_name "$cluster-pxc-0")
	# get gtidset of invalid binlog
	gtidset=$(run_mysql_local "SELECT get_gtid_set_by_binlog('$invalid_binlog');" "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0")

	next_binlog=$(find_invalid_binlog_name "$cluster-pxc-0" 1)
	next_gtidset=$(run_mysql_local "SELECT get_gtid_set_by_binlog('$next_binlog');" "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0")
	next_gtid=$(gtidset_to_gtid "$next_gtidset")

	# we should make binlogs with this gtidset empty
	local binlog
	binlog="$invalid_binlog"
	kubectl exec $cluster-pxc-0 -- bash -c "echo \"\" > /var/lib/mysql/$binlog"
	binlog=$(run_mysql_local "SELECT get_binlog_by_gtid_set('$gtidset');" "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1")
	kubectl exec $cluster-pxc-1 -- bash -c "echo \"\" > /var/lib/mysql/$binlog"
	binlog=$(run_mysql_local "SELECT get_binlog_by_gtid_set('$gtidset');" "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2")
	kubectl exec $cluster-pxc-2 -- bash -c "echo \"\" > /var/lib/mysql/$binlog"

	sleep 20

	kubectl patch pxc $cluster --type=merge -p '{"spec": {"backup": {"storages": {"minio-binlogs": {"s3": {"bucket": "operator-testing/binlogs-invalid"}}}}}}'
	# re-enable pitr
	kubectl patch pxc $cluster --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":true}}}}'
	sleep 180 # wait for next PITR collect cycle and error to appear
	check_invalid_binlogs_error "$invalid_binlog"

	gtid="$next_gtid"

	if [[ ! ${gtid} =~ ${GTID_PATTERN} ]]; then
		printf "Some garbage --> %s <-- instead of legit GTID. Exiting ${gtid}"
		exit 1
	fi

	run_recovery_check_pitr "$cluster" "restore-on-pitr-minio-invalid" "on-pitr-minio" "select-6" "" "" "$gtid"

	# invalid table should not exist
	compare_mysql_cmd "select-invalid" "SELECT * from test.invalid;" "-h $cluster-pxc-0.$cluster-pxc -uroot -proot_password"
	compare_mysql_cmd "select-invalid" "SELECT * from test.invalid;" "-h $cluster-pxc-1.$cluster-pxc -uroot -proot_password"
	compare_mysql_cmd "select-invalid" "SELECT * from test.invalid;" "-h $cluster-pxc-2.$cluster-pxc -uroot -proot_password"

	desc 'done test invalid binlogs'
}

main() {
	create_infra $namespace
	deploy_cert_manager
	kubectl_bin apply -f "$test_dir/conf/issuer.yml"
	kubectl_bin apply -f "$test_dir/conf/cert.yml"
	sleep 25
	# We are using minio with tls enabled to check if `verifyTLS: false` works fine
	start_minio "tls-minio"

	cluster="pitr-gap-errors"
	spinup_pxc "$cluster" "$test_dir/conf/$cluster.yml"

	run_backup "$cluster" "on-pitr-minio"

	write_test_data "$cluster"

	desc 'show binlog events'
	proxy=$(get_proxy "$cluster")
	run_mysql "SHOW BINLOG EVENTS IN 'binlog.000005';" "-h ${proxy} -uroot -proot_password"
	run_mysql "SHOW BINLOG EVENTS IN 'binlog.000006';" "-h ${proxy} -uroot -proot_password"

	write_data_for_pitr "$cluster"
	sleep 120 # need to wait while collector catch new data

	desc 'check second backup/restore data from binlogs'
	run_backup "$cluster" "on-pitr-minio1"
	write_more_data "$cluster"
	dest=$(sed 's,/,\\/,g' <<<$(kubectl get pxc-backup on-pitr-minio1 -o jsonpath='{.status.destination}'))
	sleep 80 # need to wait while collector catch new data
	run_recovery_check_pitr "$cluster" "restore-on-pitr-minio1" "on-pitr-minio1" "select-5" "" "$dest" ""

	desc 'binlog gap test'
	desc 'create binlog gap backup (will be marked as PITR unready)'
	run_mysql \
		"CREATE DATABASE IF NOT EXISTS test; use test; CREATE TABLE IF NOT EXISTS gap (id int PRIMARY KEY);" \
		"-h $proxy -uroot -proot_password"
	run_backup "$cluster" "on-pitr-minio-gap"
	create_binlog_gap
	check_binlog_gap_error
	check_binlog_gap_restore "error"
	check_binlog_gap_restore "force"
	check_binlog_gap_restore "no-pitr"
	desc "done binlog gap test"

	invalid_binlog_test

	destroy $namespace
	desc "test passed"
}

main
